package com.example.smail.mq;

import com.example.smail.common.Constant;
import com.example.smail.common.MessageHelper;
import com.example.smail.entity.Mail;
import com.example.smail.entity.MsgLog;
import com.example.smail.service.MsgLogService;
import com.example.smail.service.SendMailService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Objects;

/**
 * <p>
 * ConsumerService
 * </p>
 *
 * @author panzhi
 * @since 2024/7/8
 */
@Component
public class ConsumerService {

    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerService.class);

    @Autowired
    private SendMailService sendMailService;

    @Autowired
    private MsgLogService msgLogService;

    /**
     * 监听消息队列，手动确认模式，必须手动调用ack或者nack方法
     * 配置参数：spring.rabbitmq.listener.simple.acknowledge-mode=manual
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = {"mq.mail.ack"})
    public void consumeFromAck(Message message, Channel channel) throws IOException {
        LOGGER.info("收到消息：{}", message.toString());
        //将消息转化为对象
        Mail mail = MessageHelper.msgToObj(message, Mail.class);

        // 手动确认模式
        long tag = message.getMessageProperties().getDeliveryTag();
        boolean success = sendMailService.send(mail);
        if (success) {
            // 消费成功，消息会被删除
            channel.basicAck(tag, false);
        } else {
            // 消费失败，重新返回队列
            channel.basicNack(tag, false, true);
        }
    }

    /**
     * 监听消息队列，自动确认模式，无需调用ack或者nack方法，当程序执行时才删除消息
     * 配置参数：spring.rabbitmq.listener.simple.acknowledge-mode=auto
     * @param message
     */
    @RabbitListener(queues = {"mq.mail.auto"})
    public void consumeFromAuto(Message message) {
        LOGGER.info("收到消息：{}", message.toString());
        // 获取消息ID
        Mail mail = MessageHelper.msgToObj(message, Mail.class);

        // 消息幂等性处理，如果已经处理成功，无需重复消费
        MsgLog queryObj = msgLogService.selectByMsgId(mail.getMsgId());
        if(Objects.nonNull(queryObj) && Constant.SUCCESS.equals(queryObj.getStatus())){
            return;
        }

        // 发送邮件
        boolean success = sendMailService.send(mail);
        if(success){
            msgLogService.updateStatus(mail.getMsgId(), Constant.SUCCESS, "邮件发送成功");
        } else {
            msgLogService.updateStatus(mail.getMsgId(), Constant.FAIL, "邮件发送失败");
        }
    }
}
